package com.three.client;

import com.google.common.collect.Maps;
import com.three.api.connection.Connection;
import com.three.api.event.ConnectionCloseEvent;
import com.three.api.protocol.Packet;
import com.three.api.spi.common.CacheManager;
import com.three.api.spi.common.CacheManagerFactory;
import com.three.message.hall.CreateFriendRoomReqMessage;
import com.three.message.hall.JoinRoomReqMessage;
import com.three.message.hall.NotifyMemberJoinRoomMessage;
import com.three.message.hall.NotifyRoomInfoMessage;
import com.three.mahjong.base.model.MJGameType;
import com.three.mahjong.base.model.PayType;
import com.three.common.CacheKeys;
import com.three.common.message.base.*;
import com.three.common.message.mail.NotifyMailListMessage;
import com.three.common.message.player.NotifyPlayerInfoMessage;
import com.three.common.message.shop.ConsumeReqMessage;
import com.three.common.security.AesCipher;
import com.three.constant.Constants;
import com.three.event.EventBus;
import com.three.netty.client.connect.ClientConfig;
import com.three.netty.client.connect.TestStatistics;
import com.three.netty.connection.NettyConnection;
import com.three.protocol.CommandEnum;
import com.three.tools.thread.NamedPoolThreadFactory;
import com.three.tools.thread.ThreadNames;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.Random;
import java.util.concurrent.TimeUnit;

import static com.three.protocol.CommandEnum.Command.NOTIFY_MEMBER_JOIN_ROOM;
import static com.three.protocol.CommandEnum.Command.NOTIFY_ROOM_INFO;

/**
 * Created by mathua on 2017/5/30.
 */
public final class ConnClientChannelHandler extends ChannelInboundHandlerAdapter {
    private static final Logger LOGGER = LoggerFactory.getLogger(ConnClientChannelHandler.class);
    private static final Timer HASHED_WHEEL_TIMER = new HashedWheelTimer(new NamedPoolThreadFactory(ThreadNames.T_CONN_TIMER));
    public static final AttributeKey<ClientConfig> CONFIG_KEY = AttributeKey.newInstance("clientConfig");
    public static final TestStatistics STATISTICS = new TestStatistics();
    private static CacheManager cacheManager = CacheManagerFactory.create();

    private final Connection connection = new NettyConnection();

    private ClientConfig clientConfig;
    private boolean perfTest;
    private int hbTimeoutTimes;

    public ConnClientChannelHandler() {
        perfTest = true;
    }

    public ConnClientChannelHandler(ClientConfig clientConfig) {
        this.clientConfig = clientConfig;
    }

    public Connection getConnection() {
        return connection;
    }

    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        connection.updateLastReadTime();
        if (msg instanceof Packet) {
            Packet packet = (Packet) msg;
            CommandEnum.Command command = CommandEnum.Command.valueOf(packet.cmd);
            if (command == CommandEnum.Command.LOGIN) {
                int connectedNum = STATISTICS.connectedNum.incrementAndGet();
                connection.getSessionContext().changeCipher(new AesCipher(clientConfig.getClientKey(), clientConfig.getIv()));
                LoginRespMessage message = new LoginRespMessage(packet, connection);
                message.decodeBody();
                startHeartBeat(message.getHeartbeat() - 1000);
                testConsumeReq();
                LOGGER.info("handshake success, clientConfig={}, connectedNum={}, message={}", clientConfig, connectedNum, message);
            } else if (command == CommandEnum.Command.FAST_CONNECT) {
                int connectedNum = STATISTICS.connectedNum.incrementAndGet();
                String cipherStr = clientConfig.getCipher();
                String[] cs = cipherStr.split(",");
                byte[] key = AesCipher.toArray(cs[0]);
                byte[] iv = AesCipher.toArray(cs[1]);
                connection.getSessionContext().changeCipher(new AesCipher(key, iv));

                FastConnectRespMessage message = new FastConnectRespMessage(packet, connection);
                message.decodeBody();
                connection.getSessionContext().setHeartbeat(message.getHeartbeat());
                startHeartBeat(message.getHeartbeat() - 1000);
                LOGGER.info("fast connect success, clientConfig={}, connectedNum={}", clientConfig, connectedNum);
            } else if (command == CommandEnum.Command.KICK) {
                KickPlayerMessage message = new KickPlayerMessage(packet, connection);
                LOGGER.error("receive kick player msg openId={}, deviceId={}, message={},", clientConfig.getOpenId(), clientConfig.getDeviceId(), message);
                ctx.close();
            } else if (command == CommandEnum.Command.ERROR) {
                ErrorMessage message = new ErrorMessage(packet, connection);
                message.decodeBody();
                LOGGER.error("receive an error packet=" + message);
            } else if (command == CommandEnum.Command.PUSH) {
                int receivePushNum = STATISTICS.receivePushNum.incrementAndGet();

                PushMessage message = new PushMessage(packet, connection);
                message.decodeBody();
                LOGGER.info("receive push message, content={}, receivePushNum={}, packet={}"
                        , message.getContent(), receivePushNum, message);

                if (message.needAck()) {
                    AckMessage.from(message).sendRaw();
                    LOGGER.info("send ack success for sessionId={}", message.getSessionId());
                }

            } else if (command == CommandEnum.Command.HEARTBEAT) {
                LOGGER.info("receive heartbeat pong...");
            } else if (command == CommandEnum.Command.OK) {
                OkMessage message = new OkMessage(packet, connection);
                message.decodeBody();
                int bindPlayerNum = STATISTICS.bindPlayerNum.get();
                if (message.getCmd() == CommandEnum.Command.LOGIN.getNumber()) {
                    bindPlayerNum = STATISTICS.bindPlayerNum.incrementAndGet();
                }

                LOGGER.info("receive {}, bindPlayerNum={}", message, bindPlayerNum);

            } else if (command == CommandEnum.Command.HTTP_PROXY) {
                HttpResponseMessage message = new HttpResponseMessage(packet, connection);
                message.decodeBody();
                LOGGER.info("receive http response, message={}, body={}",
                        message, message.getBody() == null ? null : new String(message.getBody(), Constants.UTF_8));
            } else if(command == CommandEnum.Command.NOTIFY_PLAYER_INFO) {
                NotifyPlayerInfoMessage message = new NotifyPlayerInfoMessage(packet, connection);
                message.decodeBody();
                LOGGER.info("receive http response, message={}", message);
//                createRoomReq();
                joinRoomReq();
            } else if(command == CommandEnum.Command.NOTIFY_MAIL_LIST) {
                NotifyMailListMessage message = new NotifyMailListMessage(packet, connection);
                message.decodeBody();
            } else if(command == NOTIFY_ROOM_INFO) {
                NotifyRoomInfoMessage message = new NotifyRoomInfoMessage(packet, connection);
                message.decodeBody();
                LOGGER.info("receive notify room info, message={}", message);
            } else if(command == NOTIFY_MEMBER_JOIN_ROOM) {
                NotifyMemberJoinRoomMessage message = new NotifyMemberJoinRoomMessage(packet, connection);
                message.decodeBody();
                LOGGER.info("receive notify room info, message={}", message);
            }
        }

        LOGGER.debug("receive package={}, chanel={}", msg, ctx.channel());
    }

    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
        connection.close();
        LOGGER.error("caught an ex, channel={}", ctx.channel(), cause);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        int clientNum = STATISTICS.clientNum.incrementAndGet();
        LOGGER.info("client connect channel={}, clientNum={}", ctx.channel(), clientNum);

        for (int i = 0; i < 3; i++) {
            if (clientConfig != null) break;
            clientConfig = ctx.channel().attr(CONFIG_KEY).getAndSet(null);
            if (clientConfig == null) TimeUnit.SECONDS.sleep(1);
        }

        if (clientConfig == null) {
            throw new NullPointerException("client config is null, channel=" + ctx.channel());
        }

        connection.init(ctx.channel(), true);
        if (perfTest) {
            handshake();
        } else {
            tryFastConnect();
        }
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        int clientNum = STATISTICS.clientNum.decrementAndGet();
        connection.close();
        EventBus.I.post(new ConnectionCloseEvent(connection));
        LOGGER.info("client disconnect channel={}, clientNum={}", connection, clientNum);
    }

    private void tryFastConnect() {

        Map<String, String> sessionTickets = getFastConnectionInfo(clientConfig.getDeviceId());

        if (sessionTickets == null) {
            handshake();
            return;
        }
        String sessionId = sessionTickets.get("sessionId");
        if (sessionId == null) {
            handshake();
            return;
        }
        String expireTime = sessionTickets.get("expireTime");
        if (expireTime != null) {
            long exp = Long.parseLong(expireTime);
            if (exp < System.currentTimeMillis()) {
                handshake();
                return;
            }
        }

        final String cipher = sessionTickets.get("cipherStr");

        FastConnectReqMessage message = new FastConnectReqMessage(connection);
        message.setDeviceId(clientConfig.getDeviceId());
        message.setSessionId(sessionId);

        message.sendRaw(channelFuture -> {
            if (channelFuture.isSuccess()) {
                clientConfig.setCipher(cipher);
            } else {
                handshake();
            }
        });
        LOGGER.debug("send fast connect message={}", message);
    }

    private void saveToRedisForFastConnection(ClientConfig client, String sessionId, Long expireTime, byte[] sessionKey) {
        Map<String, String> map = Maps.newHashMap();
        map.put("sessionId", sessionId);
        map.put("expireTime", expireTime + "");
        map.put("cipherStr", connection.getSessionContext().getCipher().toString());
        String key = CacheKeys.getDeviceIdKey(client.getDeviceId());
        cacheManager.set(key, map, 60 * 5); //5分钟
    }

    @SuppressWarnings("unchecked")
    private Map<String, String> getFastConnectionInfo(String deviceId) {
        String key = CacheKeys.getDeviceIdKey(deviceId);
        return cacheManager.get(key, Map.class);
    }

    private void joinRoomReq() {
        JoinRoomReqMessage message = JoinRoomReqMessage.from(connection);
        message.setRoomId(14);
        message.setGameType(MJGameType.GUANG_DONG_MA_JIANG.id);
        message.send();
    }

    private void createRoomReq() {
        CreateFriendRoomReqMessage message = CreateFriendRoomReqMessage.from(connection);
        message.setGameType(MJGameType.GUANG_DONG_MA_JIANG.id);
        message.setPayType(PayType.MEMBERS_PAY.id);
        message.setMaxRounds(8);
        message.setBaseScore(1);
        message.setCeilingScore(3);
        message.send();
    }
    private void testConsumeReq(){
        ConsumeReqMessage message=new ConsumeReqMessage(new Packet(CommandEnum.Command.COMSUME_REQ),connection);
        message.setItemId(1);
        message.setChannelId(1);
        message.setNum(10086);
        message.send();
    }

    private void handshake() {
        LoginReqMessage message = new LoginReqMessage(connection);
        Random rand = new Random();
        message.setOpenId(/*"test_" + rand.nextInt(1000000)*/"r-2");
//        message.setClientKey(clientConfig.getClientKey());
//        message.setIv(clientConfig.getIv());
        message.setClientVersion(/*clientConfig.getClientVersion()*/"1.0.0");
        message.setDeviceId(/*clientConfig.getDeviceId()*/"d207fb879710ee0bb1e111395572a51ebe6a2005");
        message.setOsName(/*clientConfig.getOsName()*/"android");
        message.setOsVersion(/*clientConfig.getOsVersion()*/"1.0.0");
        message.setPlatform("android");
        message.setTimestamp(/*System.currentTimeMillis()*/1497702694);
        message.send();
        LOGGER.debug("send handshake message={}", message);
    }

    private void startHeartBeat(final int heartbeat) throws Exception {
        HASHED_WHEEL_TIMER.newTimeout(new TimerTask() {
            @Override
            public void run(Timeout timeout) throws Exception {
                if (connection.isConnected() && healthCheck()) {
                    HASHED_WHEEL_TIMER.newTimeout(this, heartbeat, TimeUnit.MILLISECONDS);
                }
            }
        }, heartbeat, TimeUnit.MILLISECONDS);
    }

    private boolean healthCheck() {

        if (connection.isReadTimeout()) {
            hbTimeoutTimes++;
            LOGGER.warn("heartbeat timeout times={}, client={}", hbTimeoutTimes, connection);
        } else {
            hbTimeoutTimes = 0;
        }

        if (hbTimeoutTimes >= 2) {
            LOGGER.warn("heartbeat timeout times={} over limit={}, client={}", hbTimeoutTimes, 2, connection);
            hbTimeoutTimes = 0;
            connection.close();
            return false;
        }

        if (connection.isWriteTimeout()) {
            LOGGER.info("send heartbeat ping...");
            connection.send(new Packet(CommandEnum.Command.HEARTBEAT));
        }

        return true;
    }
}
